package com.markhsiu.minimq.broker.task;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.broker.BrokerFactory;
import com.markhsiu.minimq.broker.store.StoreManager;
import com.markhsiu.minimq.channel.ChannelProcessor;
import com.markhsiu.minimq.message.Message;
import com.markhsiu.minimq.message.constant.MessageCmdEnum;
import com.markhsiu.minimq.message.constant.MessageSourceEnum;

public class MessagePushTask extends Thread{
	
	private static Logger logger = LoggerFactory.getLogger(MessagePushTask.class);
	
	private BrokerFactory broker = BrokerFactory.instance();
	
	public  MessagePushTask() {
		super("MessagePushTask-thread");
	}
	
	@Override
	public void run() {
		logger.info(" .... Message Push Task start ..... ");
		while(true){
			Map<String,Map<String,String>> consumers  = broker.getAllConsumers();
			
			for(Map.Entry<String, Map<String,String>> consumer : consumers.entrySet()){
				ChannelProcessor<Message> channel = broker.getConsumerConnection(consumer.getKey());
				for (String topic : consumer.getValue().keySet()) {
					Message message = StoreManager.pollMessage(topic);
					if(message != null){
						message.setSource(MessageSourceEnum.BROKER);
						message.setTarget(MessageSourceEnum.CUSTOMER);
						message.setCmd(MessageCmdEnum.NEW);
						channel.writeAndFlush(message);
					}
				}
			}
		}
	}

}
